The below code is self explanatory and also needful descriptions are provided as part of comments against each line of code.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public class MultiCoreParallelComputation {
public static void main(String[] args) {
//In compute intensive task the max number of threads[worker threads + main thread]
//created by user should be equal to the number of logical processors or 2*no of cores
//which is already considered by the default ForkJoinPool used by parallel stream.
//Thus no of core*2 = available logical processors.
System.out.println("Available logical processors in this system:
"+Runtime.getRuntime().availableProcessors());
//Thus parallelism value in below commonPool, is the no of worker threads available
//in this pool to execute the assigned task. In our case in below code, 8 or 20 tasks
//will be carried out by available threads[worker+main] in Fork Join Pool thus
//"worker+main" number of task will be executed as a badge at a time, then next badge
//will be executed and so on up to the end of all tasks.
System.out.println("FJP Parallelism default possibility info:
"+ForkJoinPool.commonPool());
//Declaring an Integer array. We can user bigger array by uncommenting next one also.
Integer[] arr = new Integer[] {1, 2, 3, 4, 5, 6, 7, 8};
//Integer[] arr = new Integer[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
//16, 17, 18, 19, 20};
//Converted the Integer array to List containing integer.
List<Integer> lst = Arrays.asList(arr);
//Created a synchronous/sequential stream of integers, then multiplied each integer
//value with 2 and printed each output element.Notice each multiplication task is done
//by as single main thread sequentially so result is also getting displayed
//sequentially. We can comment below line to witness parallel stream behavior next.
lst.stream().map(x->transformElements(x)).forEach(z->printElements(z));
//Created a parallel stream of integers, then multiplied each integer value with 2
//and printed each output element. Here each task of multiplication will be carried
//out by different thread from default ForkJoinPool on different available cores so
//outcome will be very fast but with random display or print. We can comment below
//line to witness same parallel stream behavior in orderly fashion next.
lst.parallelStream().map(x->transformElements(x)).forEach(z->printElements(z));
//If you wanna print element orderly after getting executed parallely through
//different threads and different available cores then use below forEachOrdered method
//to print. Remember forEachOrdered will guarranty ordering only if stream guarranties
//ordering as in our case stream is constructed out of List which is an order data
//structure. We can comment below line to witness usage of custom FJP next.
lst.parallelStream().map(x->transformElements(x)).forEachOrdered(z->printElements(z));
//Executing only printing task in a separate customized fork join pool having
//customized no of worker threads.
processElements(lst.parallelStream().map(x->transformElements(x)), 8);
}
private static void processElements(Stream<Integer> strm, int noOfThreads) {
ForkJoinPool fjp = new ForkJoinPool(noOfThreads);
fjp.submit(()->strm.forEachOrdered(z->printElements(z)));
fjp.shutdown();
try {
fjp.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static int transformElements(Integer ele) {
sleep(1200);
int res = ele*2;
System.out.println("Transforming: "+ele+" by : "+Thread.currentThread()+" to : "+res);
return res;
}
private static boolean sleep(int ms) {
try {
Thread.sleep(ms);
return true;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}
private static void printElements(int z) {
System.out.println("Printing by "+Thread.currentThread()+" : "+z);
}
}
Recent Comments